View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsConnectionConsumer.java,v 1.2 2005/03/18 03:36:37 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import javax.jms.Connection;
48  import javax.jms.ConnectionConsumer;
49  import javax.jms.Destination;
50  import javax.jms.JMSException;
51  import javax.jms.Message;
52  import javax.jms.MessageConsumer;
53  import javax.jms.MessageListener;
54  import javax.jms.ServerSession;
55  import javax.jms.ServerSessionPool;
56  import javax.jms.Session;
57  import javax.jms.Topic;
58  
59  import org.apache.commons.logging.Log;
60  import org.apache.commons.logging.LogFactory;
61  
62  
63  /***
64   * Implementation of the <code>javax.jms.ConnectionConsumer</code> interface.
65   *
66   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
67   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
68   * @version $Revision: 1.2 $ $Date: 2005/03/18 03:36:37 $
69   */
70  class JmsConnectionConsumer
71          implements ConnectionConsumer, MessageListener {
72  
73      /***
74       * The session to receive messages via.
75       */
76      private Session _session;
77  
78      /***
79       * The consumer of messages.
80       */
81      private MessageConsumer _consumer;
82  
83      /***
84       * The server session pool.
85       */
86      private ServerSessionPool _pool;
87  
88      /***
89       * The logger
90       */
91      private static final Log _log =
92              LogFactory.getLog(JmsConnectionConsumer.class);
93  
94  
95      /***
96       * Construct a new <code>JmsConnectionConsumer</code>.
97       *
98       * @param connection       the connection which created this
99       * @param destination      the destination to access
100      * @param pool             the server session pool
101      * @param selector         the message selector. May be <code>null</code>
102      * @param maxMessages      the maximum number of messages that can be
103      *                         assigned to a server session at one time
104      * @throws JMSException if the consumer cannot be constructed
105      */
106     public JmsConnectionConsumer(Connection connection, Destination destination,
107                                  ServerSessionPool pool, String selector,
108                                  int maxMessages)
109             throws JMSException {
110         this(connection, destination, null, pool, selector, maxMessages);
111     }
112 
113     /***
114      * Construct a new <code>JmsConnectionConsumer</code>.
115      *
116      * @param connection       the connection which created this
117      * @param destination      the destination to access
118      * @param subscriptionName the durable subscription name. May be
119      *                         <code>null</code>
120      * @param pool             the server session pool
121      * @param selector         the message selector. May be <code>null</code>
122      * @param maxMessages      the maximum number of messages that can be
123      *                         assigned to a server session at one time
124      * @throws JMSException if the consumer cannot be constructed
125      */
126     public JmsConnectionConsumer(Connection connection, Destination destination,
127                                  String subscriptionName,
128                                  ServerSessionPool pool, String selector,
129                                  int maxMessages)
130             throws JMSException {
131         if (connection == null) {
132             throw new IllegalArgumentException("Argument 'connection' is null");
133         }
134         if (destination == null) {
135             throw new IllegalArgumentException(
136                     "Argument 'destination' is null");
137         }
138         if (pool == null) {
139             throw new IllegalArgumentException("Argument 'pool' is null");
140         }
141         if (maxMessages <= 0) {
142             throw new IllegalArgumentException(
143                     "Argument 'maxMessages' must be > 0");
144         }
145 
146         _pool = pool;
147 
148         _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
149         if (subscriptionName == null) {
150             _consumer = _session.createConsumer(destination, selector, false);
151         } else {
152             _consumer = _session.createDurableSubscriber((Topic) destination,
153                                                          subscriptionName,
154                                                          selector, false);
155         }
156 
157         _consumer.setMessageListener(this);
158     }
159 
160     /***
161      * Returns the server session pool associated with this connection consumer.
162      *
163      * @return the server session pool used by this connection consumer
164      */
165     public ServerSessionPool getServerSessionPool() {
166         return _pool;
167     }
168 
169     /***
170      * Close the connection consumer, freeing any allocated resources.
171      *
172      * @throws JMSException if the consumer cannot be closed
173      */
174     public void close() throws JMSException {
175         try {
176             _consumer.close();
177             _session.close();
178         } finally {
179             _pool = null;
180             _consumer = null;
181             _session = null;
182         }
183     }
184 
185     /***
186      * Impmentation of MessageListener.onMessage, to receive messages
187      * from the server. In this most simple case, it loads each message into a
188      * server session and calls the start method.
189      *
190      * @param message the message
191      */
192     public void onMessage(Message message) {
193         try {
194             // not very sophisticated at this point. Simply get a session
195             // from the pool, put the message in it, and start it.
196             ServerSession serverSession = _pool.getServerSession();
197             JmsSession session = (JmsSession) serverSession.getSession();
198             message.acknowledge();
199             session.addMessage(message);
200             serverSession.start();
201         } catch (Exception exception) {
202             _log.error(exception, exception);
203         }
204     }
205 
206 }